// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <foundationdb/fdb_c.h>
#include <foundationdb/fdb_c_options.g.h>
#include <gtest/gtest_prod.h>

#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <thread>
#include <utility>
#include <vector>

#include "txn_kv_error.h"
#include "versionstamp.h"

// =============================================================================
//
// =============================================================================

namespace doris::cloud {

class Transaction;
class RangeGetIterator;
class TxnKv;

// A key selector is used to specify the position of a key in a range query.
enum class RangeKeySelector {
    FIRST_GREATER_OR_EQUAL,
    FIRST_GREATER_THAN,
    LAST_LESS_OR_EQUAL,
    LAST_LESS_THAN
};

struct RangeGetOptions {
    // if true, key range will not be included in txn conflict detection this time.
    bool snapshot = false;
    // if non-zero, indicates the maximum number of key-value pairs to return.
    int batch_limit = 10000;
    // if true, the iterator will return keys in reverse order.
    bool reverse = false;
    // The key selector for the beginning of the range.
    RangeKeySelector begin_key_selector = RangeKeySelector::FIRST_GREATER_OR_EQUAL;
    // The key selector for the end of the range.
    //
    // REMEMBER: The end key is exclusive by default.
    RangeKeySelector end_key_selector = RangeKeySelector::FIRST_GREATER_OR_EQUAL;
};

/**
 * Unlike `RangeGetIterator`, which can only iterate within a page of range, this iterator is
 * capable of iterating over the entire specified range.
 *
 * Usage:
 * for (auto kvp = it.next(); kvp.has_value(); kvp = it.next()) {
 *     auto [k, v] = *kvp;
 * }
 * if (!it.is_valid()) {
 *     return err;
 * }
 */
struct FullRangeGetOptions : public RangeGetOptions {
    std::shared_ptr<TxnKv> txn_kv;
    // Trigger prefetch getting next batch kvs before access them
    bool prefetch = false;
    // If non-zero, indicates the exact number of key-value pairs to return.
    int exact_limit = 0;
    // Reference. If not null, each inner range get is performed through this transaction; otherwise
    // perform each inner range get through a new transaction.
    Transaction* txn = nullptr;
    // If users want to extend the lifespan of the kv pair returned by `next()`, they can pass an
    // object pool to collect the inner iterators that have completed iterated.
    std::vector<std::unique_ptr<RangeGetIterator>>* obj_pool = nullptr;

    FullRangeGetOptions(std::shared_ptr<TxnKv> _txn_kv) : txn_kv(std::move(_txn_kv)) {}
    FullRangeGetOptions() = default;
};

class FullRangeGetIterator {
public:
    FullRangeGetIterator() = default;

    virtual ~FullRangeGetIterator() = default;

    virtual bool is_valid() const = 0;

    virtual bool has_next() = 0;

    virtual TxnErrorCode error_code() const = 0;

    virtual std::optional<std::pair<std::string_view, std::string_view>> next() = 0;

    virtual std::optional<std::pair<std::string_view, std::string_view>> peek() = 0;
};

class TxnKv {
public:
    TxnKv() = default;
    virtual ~TxnKv() = default;

    /**
     * Creates a transaction
     * TODO: add options to create the txn
     *
     * @param txn output param
     * @return TXN_OK for success
     */
    virtual TxnErrorCode create_txn(std::unique_ptr<Transaction>* txn) = 0;

    virtual int init() = 0;

    virtual std::unique_ptr<FullRangeGetIterator> full_range_get(std::string begin, std::string end,
                                                                 FullRangeGetOptions opts) = 0;
};

class Transaction {
public:
    Transaction() = default;
    virtual ~Transaction() = default;

    virtual void put(std::string_view key, std::string_view val) = 0;

    /**
     * @param snapshot if true, `key` will not be included in txn conflict detection this time
     * @return TXN_OK for success get a key, TXN_KEY_NOT_FOUND for key not found, otherwise for error
     */
    virtual TxnErrorCode get(std::string_view key, std::string* val, bool snapshot = false) = 0;

    /**
     * Closed-open range
     * @param begin the begin key, inclusive
     * @param end the end key, exclusive
     * @param iter output param for the iterator to iterate over the key-value pairs in the specified range.
     *             If the range is empty, the iterator will be valid but `has_next()` will return false.
     *             If an error occurs, the iterator will be invalid and `error_code()` will return the error code.
     * @param opts options for range get
     *             - `snapshot`: if true, the range will not be included in txn conflict detection this time.
     *             - `limit`: the maximum number of key-value pairs to return.
     *             - `reverse`: if true, the iterator will return keys in reverse order.
     * @return TXN_OK for success, otherwise for error
     */
    virtual TxnErrorCode get(std::string_view begin, std::string_view end,
                             std::unique_ptr<RangeGetIterator>* iter,
                             const RangeGetOptions& opts) = 0;

    // A convenience method for `get` with default options, to keep backward compatibility.
    TxnErrorCode get(std::string_view begin, std::string_view end,
                     std::unique_ptr<RangeGetIterator>* iter, bool snapshot = false,
                     int limit = 10000) {
        RangeGetOptions opts;
        opts.snapshot = snapshot;
        opts.batch_limit = limit;
        return get(begin, end, iter, opts);
    }

    /**
     * Get a full range of key-value pairs.
     * @param begin the begin key, inclusive
     * @param end the end key, exclusive
     * @param opts options for full range get
     * @return a FullRangeGetIterator for iterating over the key-value pairs in the specified range.
     *         If the range is empty, the iterator will be valid but `has_next()` will return false.
     *         If an error occurs, the iterator will be invalid and `error_code()` will return the error code.
     */
    virtual std::unique_ptr<FullRangeGetIterator> full_range_get(
            std::string_view begin, std::string_view end,
            FullRangeGetOptions opts = FullRangeGetOptions()) = 0;

    /**
     * Put a key-value pair in which key will in the form of
     * `key_prefix + versiontimestamp`. `versiontimestamp` is autogenerated by the
     * system and it's 10-byte long and encoded in big-endian
     *
     * @param key_prefix prefix for key convertion, can be zero-length
     * @param val value
     */
    virtual void atomic_set_ver_key(std::string_view key_prefix, std::string_view val) = 0;

    /**
     * Put a key-value pair in which key will in the form of `key_prefix + versiontimestamp + key_suffix`.
     * `versiontimestamp` is autogenerated by the system and it's 10-byte long and encoded in big-endian.
     *
     * @param key key for conversion, it should at least 10-byte long.
     * @param offset the offset of the versionstamp to place. `offset` + 10 must be less than or equal to
     *               the length of `key`.
     * @param val value
     * @return true for success, otherwise the offset is invalid or the key is too short.
     */
    virtual bool atomic_set_ver_key(std::string_view key, uint32_t offset,
                                    std::string_view val) = 0;

    /**
     * Put a key-value pair in which key will in the form of
     * `value + versiontimestamp`. `versiontimestamp` is autogenerated by the
     * system and it's 10-byte long and encoded in big-endian
     *
     * @param key prefix for key convertion, can be zero-length
     * @param val value
     */
    virtual void atomic_set_ver_value(std::string_view key, std::string_view val) = 0;

    /**
     * Adds a value to database.
     *
     * The default value is zero if no such key exists before.
     *
     * @param to_add positive for addition, negative for substraction
     * @return 0 for success otherwise error
     */
    virtual void atomic_add(std::string_view key, int64_t to_add) = 0;
    // TODO: min max or and xor cmp_and_clear set_ver_value

    /**
     * Decode the atomic value written by `atomic_add`.
     *
     * @param data the data to decode
     * @return true for success, otherwise the data format is invalid.
     */
    virtual bool decode_atomic_int(std::string_view data, int64_t* val) = 0;

    virtual void remove(std::string_view key) = 0;

    /**
     * Remove a closed-open range
     */
    virtual void remove(std::string_view begin, std::string_view end) = 0;

    /**
     *
     *@return TXN_OK for success otherwise error
     */
    virtual TxnErrorCode commit() = 0;

    /**
     * Issue a watch on `key`, it will commit the txn and wait until the watch is triggered.
     *
     * A watch’s behavior is relative to the transaction that created it. A watch will report a change in
     * relation to the key’s value as readable by that transaction. The initial value used for comparison
     * is either that of the transaction’s read version or the value as modified by the transaction itself
     * prior to the creation of the watch. If the value changes and then changes back to its initial value,
     * the watch might not report the change.
     *
     * @return TXN_OK for success otherwise error
     */
    virtual TxnErrorCode watch_key(std::string_view key) = 0;

    /**
     * Gets the read version used by the txn.
     * Note that it does not make any sense we call this function before
     * any `Transaction::get()` is called.
     *
     *@return TXN_OK for success otherwise error
     */
    virtual TxnErrorCode get_read_version(int64_t* version) = 0;

    /**
     * Gets the commited version used by the txn.
     * Note that it does not make any sense we call this function before
     * a successful call to `Transaction::commit()`.
     *
     *@return TXN_OK for success, TXN_CONFLICT for conflict, otherwise error
     */
    virtual TxnErrorCode get_committed_version(int64_t* version) = 0;

    /**
     * Aborts this transaction
     *
     * @return TXN_OK for success otherwise error
     */
    virtual TxnErrorCode abort() = 0;

    struct BatchGetOptions {
        BatchGetOptions() : BatchGetOptions(false) {};
        BatchGetOptions(bool s) : snapshot(s), concurrency(1000) {};

        // if true, `key` will not be included in txn conflict detection this time.
        //
        // Default: false
        bool snapshot;

        // the maximum number of concurrent requests submitted to fdb at one time.
        //
        // Default: 1000
        int concurrency;

        // Used for `batch_scan`, if true, the underlying iterator will return keys in reverse order.
        //
        // Default: false
        bool reverse = false;
    };

    /**
     * @brief batch get keys
     *
     * @param res
     * @param keys
     * @param opts
     * @return If all keys are successfully retrieved, return TXN_OK. Otherwise, return the code of the first occurring error
     */
    virtual TxnErrorCode batch_get(std::vector<std::optional<std::string>>* res,
                                   const std::vector<std::string>& keys,
                                   const BatchGetOptions& opts = BatchGetOptions()) = 0;

    /**
     * @brief Batch scan for the first key-value pair with each given key prefix.
     *
     * For each key prefix in the input key_prefixs, this function scans for keys that start with
     * that prefix in the direction specified by `opts.reverse` (forward by default) and returns
     * the first key-value pair encountered. If no key with the given prefix is found, the
     * corresponding result is an empty optional.
     *
     * The function scans keys in batches and is more efficient than scanning each prefix individually.
     *
     * @param[out] res The output vector of optionals. Each element corresponds to the same index as in the input key_prefixs.
     *                  If a key-value pair with the prefix is found, the element will contain the pair; otherwise, it will be std::nullopt.
     * @param[in] key_prefixs The list of key prefixes to scan for.
     * @param[in] opts Options such as `reverse` and `snapshot`. If `reverse` is true, the scan is in the backward direction.
     *
     * @return TXN_OK if all scans completed successfully. If any error occurs during the scanning,
     *         the function stops immediately and returns the error code of the first error encountered.
     *         Note: The output vector `res` may be partially filled when an error occurs.
     */
    virtual TxnErrorCode batch_scan(
            std::vector<std::optional<std::pair<std::string, std::string>>>* res,
            const std::vector<std::string>& key_prefixs,
            const BatchGetOptions& opts = BatchGetOptions());

    /**
     * @brief Batch scan for the first key-value pairs within each given range.
     *
     * For each range in the input ranges, this function performs a range scan from the begin key
     * (inclusive) to the end key (exclusive) in the direction specified by `opts.reverse` (forward
     * by default) and returns all key-value pairs found within that range. If no keys are found
     * within a range, the corresponding result is an empty optional.
     *
     * The function scans ranges in batches and is more efficient than scanning each range individually.
     *
     * @param[out] res The output vector of optionals. Each element corresponds to the same index as in the input ranges.
     *                  If key-value pairs are found within the range, the element will contain a vector of pairs;
     *                  otherwise, it will be std::nullopt.
     * @param[in] ranges The list of ranges to scan. Each range is a pair of (begin_key, end_key) where
     *                   begin_key is inclusive and end_key is exclusive.
     * @param[in] opts Options such as `reverse` and `snapshot`. If `reverse` is true, the scan is in the backward direction.
     *
     * @return TXN_OK if all scans completed successfully. If any error occurs during the scanning,
     *         the function stops immediately and returns the error code of the first error encountered.
     *         Note: The output vector `res` may be partially filled when an error occurs.
     */
    virtual TxnErrorCode batch_scan(
            std::vector<std::optional<std::pair<std::string, std::string>>>* res,
            const std::vector<std::pair<std::string, std::string>>& ranges,
            const BatchGetOptions& opts = BatchGetOptions()) = 0;

    /**
     * @brief return the approximate bytes consumed by the underlying transaction buffer.
     **/
    virtual size_t approximate_bytes() const = 0;

    /**
     * @brief return the num get keys submitted to this txn.
     **/
    virtual size_t num_get_keys() const = 0;

    /**
     * @brief return the num delete keys submitted to this txn.
     **/
    virtual size_t num_del_keys() const = 0;

    /**
     * @brief return the num put keys submitted to this txn.
     **/
    virtual size_t num_put_keys() const = 0;

    /**
     * @brief return the bytes of the delete keys consumed.
     **/
    virtual size_t delete_bytes() const = 0;

    /**
     * @brief return the bytes of the get values consumed.
     **/
    virtual size_t get_bytes() const = 0;

    /**
     * @brief return the bytes of the put key and values consumed.
     **/
    virtual size_t put_bytes() const = 0;

    /**
     * @brief Enable getting versionstamp for this transaction.
     * Must be called before commit() if you want to get versionstamp.
     **/
    virtual void enable_get_versionstamp() {}

    /**
     * @brief Get the versionstamp used by the transaction.
     * Only available after a successful commit() and when enable_get_versionstamp() was called.
     * @param versionstamp output parameter to store the versionstamp
     * @return TXN_OK for success, TXN_INVALID_ARGUMENT if not enabled, 
     *         TXN_KEY_NOT_FOUND if not available
     **/
    virtual TxnErrorCode get_versionstamp(Versionstamp* versionstamp) {
        return TxnErrorCode::TXN_INVALID_ARGUMENT;
    }
};

class RangeGetIterator {
public:
    RangeGetIterator() = default;
    virtual ~RangeGetIterator() = default;

    /**
     * Checks if we can call `next()` on this iterator.
     */
    virtual bool has_next() const = 0;

    /**
     * Gets next element, this is usually called after a check of `has_next()` succeeds,
     * If `has_next()` is not checked, the return value may be undefined.
     *
     * @return a kv pair
     */
    virtual std::pair<std::string_view, std::string_view> next() = 0;

    /**
     * Gets next element but not advance the cursor, this is usually called after a check of `has_next()` succeeds,
     * If `has_next()` is not checked, the return value may be undefined.
     *
     * @return a kv pair
     */
    virtual std::pair<std::string_view, std::string_view> peek() const = 0;

    /**
     * Repositions the offset to `pos`
     */
    virtual void seek(size_t pos) = 0;

    /**
     * Checks if there are more KVs to be get from the range, caller usually wants
     * to issue another `get` with the last key of this iteration.
     *
     * @return if there are more kvs that this iterator cannot cover
     */
    virtual bool more() const = 0;

    /**
     *
     * Gets size of the range, some kinds of iterators may not support this function.
     *
     * @return size
     */
    virtual int size() const = 0;

    /**
     * Get all FDBKeyValue's bytes include key's bytes
     * RangeGetIterator created by get range, when get range the keys in the range too.
     */
    virtual int64_t get_kv_bytes() const = 0;

    /**
     * Get the remaining size of the range, some kinds of iterators may not support this function.
     *
     * @return size
     */
    virtual int remaining() const = 0;

    /**
     * Resets to initial state, some kinds of iterators may not support this function.
     */
    virtual void reset() = 0;

    /**
     * Get the begin key of the next iterator if `more()` is true, otherwise returns empty string.
     */
    virtual std::string next_begin_key() const = 0;

    /**
     * Get the last key of the iterator, it can be used as the end key of the next iterator when
     * the key selector is FIRST_GREATER_OR_EQUAL.
     *
     * ATTN: This is ONLY used for reverse range get.
     */
    virtual std::string_view last_key() const = 0;

    RangeGetIterator(const RangeGetIterator&) = delete;
    RangeGetIterator& operator=(const RangeGetIterator&) = delete;
};

// =============================================================================
//  FoundationDB implementation of TxnKv
// =============================================================================

namespace fdb {
class Database;
class Transaction;
class Network;
} // namespace fdb

class FdbTxnKv : public TxnKv {
public:
    FdbTxnKv() = default;
    ~FdbTxnKv() override = default;

    TxnErrorCode create_txn(std::unique_ptr<Transaction>* txn) override;
    TxnErrorCode create_txn_with_system_access(std::unique_ptr<Transaction>* txn);

    int init() override;

    std::unique_ptr<FullRangeGetIterator> full_range_get(std::string begin, std::string end,
                                                         FullRangeGetOptions opts) override;

    // Return the partition boundaries of the database.
    TxnErrorCode get_partition_boundaries(std::vector<std::string>* boundaries);

    // Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that
    // the client is saturated. This value is updated every second.
    double get_client_thread_busyness() const;

    static std::string_view fdb_partition_key_prefix() { return "\xff/keyServers/"; }
    static std::string_view fdb_partition_key_end() {
        // '0' is the next byte after '/' in the ASCII table
        return "\xff/keyServers0";
    }

private:
    std::shared_ptr<fdb::Network> network_;
    std::shared_ptr<fdb::Database> database_;
};

namespace fdb {

class Network {
public:
    Network(FDBNetworkOption opt) : opt_(opt) {}

    /**
     * @return 0 for success otherwise non-zero
     */
    int init();

    /**
     * Notify the newwork thread to stop, this is an async. call, check
     * Network::working to ensure the network exited finally.
     *
     * FIXME: may be we can implement it as a sync. function.
     */
    void stop();

    ~Network() = default;

private:
    std::shared_ptr<std::thread> network_thread_;
    FDBNetworkOption opt_;

    // Global state, only one instance of Network is allowed
    static std::atomic<bool> working;
};

class Database {
public:
    Database(std::shared_ptr<Network> net, std::string cluster_file, FDBDatabaseOption opt)
            : network_(std::move(net)), cluster_file_path_(std::move(cluster_file)), opt_(opt) {}

    /**
     *
     * @return 0 for success otherwise false
     */
    int init();

    ~Database() {
        if (db_ != nullptr) fdb_database_destroy(db_);
    }

    FDBDatabase* db() { return db_; };

    std::shared_ptr<Transaction> create_txn(FDBTransactionOption opt);

private:
    std::shared_ptr<Network> network_;
    std::string cluster_file_path_;
    FDBDatabase* db_ = nullptr;
    FDBDatabaseOption opt_;
};

class RangeGetIterator : public cloud::RangeGetIterator {
public:
    /**
     * Iterator takes the ownership of input future
     */
    RangeGetIterator(FDBFuture* fut, bool owns = true)
            : fut_(fut), owns_fut_(owns), kvs_(nullptr), kvs_size_(-1), more_(false), idx_(-1) {}

    RangeGetIterator(RangeGetIterator&& o) {
        if (fut_ && owns_fut_) fdb_future_destroy(fut_);
        fut_ = o.fut_;
        owns_fut_ = o.owns_fut_;
        kvs_ = o.kvs_;
        kvs_size_ = o.kvs_size_;
        more_ = o.more_;
        idx_ = o.idx_;

        o.fut_ = nullptr;
        o.kvs_ = nullptr;
        o.idx_ = 0;
        o.kvs_size_ = 0;
        o.more_ = false;
    }

    ~RangeGetIterator() override {
        // Release all memory
        if (fut_ && owns_fut_) fdb_future_destroy(fut_);
    }

    TxnErrorCode init();

    std::pair<std::string_view, std::string_view> next() override {
        if (idx_ < 0 || idx_ >= kvs_size_) return {};
        const auto& kv = kvs_[idx_++];
        return {{(char*)kv.key, (size_t)kv.key_length}, {(char*)kv.value, (size_t)kv.value_length}};
    }

    std::pair<std::string_view, std::string_view> peek() const override {
        if (idx_ < 0 || idx_ >= kvs_size_) return {};
        const auto& kv = kvs_[idx_];
        return {{(char*)kv.key, (size_t)kv.key_length}, {(char*)kv.value, (size_t)kv.value_length}};
    }

    void seek(size_t pos) override { idx_ = pos; }

    bool has_next() const override { return (idx_ < kvs_size_); }

    /**
     * Check if there are more KVs to be get from the range, caller usually wants
     * to issue a nother `get` with the last key of this iteration.
     */
    bool more() const override { return more_; }

    int size() const override { return kvs_size_; }

    int64_t get_kv_bytes() const override {
        int64_t total_bytes {};
        for (int i = 0; i < kvs_size_; i++) {
            total_bytes += kvs_[i].key_length + kvs_[i].value_length;
        }
        return total_bytes;
    }

    int remaining() const override {
        if (idx_ < 0 || idx_ >= kvs_size_) return 0;
        return kvs_size_ - idx_;
    }

    void reset() override { idx_ = 0; }

    std::string next_begin_key() const override {
        std::string k;
        if (!more()) return k;
        const auto& kv = kvs_[kvs_size_ - 1];
        k.reserve((size_t)kv.key_length + 1);
        k.append((char*)kv.key, (size_t)kv.key_length);
        k.push_back('\x00');
        return k;
    }

    std::string_view last_key() const override {
        if (kvs_size_ <= 0) return {};
        const auto& kv = kvs_[kvs_size_ - 1];
        return {(char*)kv.key, (size_t)kv.key_length};
    }

    RangeGetIterator(const RangeGetIterator&) = delete;
    RangeGetIterator& operator=(const RangeGetIterator&) = delete;

private:
    FDBFuture* fut_;
    bool owns_fut_;
    const FDBKeyValue* kvs_;
    int kvs_size_;
    fdb_bool_t more_;
    int idx_;
};

class Transaction : public cloud::Transaction {
    FRIEND_TEST(TxnKvTest, ReportConflictingRange);
    FRIEND_TEST(TxnKvTest, VersionedGetConflictRange);

public:
    friend class Database;
    friend class FullRangeGetIterator;

    Transaction(std::shared_ptr<Database> db) : db_(std::move(db)) {}

    ~Transaction() override {
        if (txn_) fdb_transaction_destroy(txn_);
    }

    /**
     *
     * @return TxnErrorCode for success otherwise false
     */
    TxnErrorCode init();
    TxnErrorCode enable_access_system_keys();

    void put(std::string_view key, std::string_view val) override;

    using cloud::Transaction::get;
    /**
     * @param snapshot if true, `key` will not be included in txn conflict detection this time
     * @return TXN_OK for success get a key, TXN_KEY_NOT_FOUND for key not found, otherwise for error
     */
    TxnErrorCode get(std::string_view key, std::string* val, bool snapshot = false) override;
    /**
     * Closed-open range
     * @param begin the begin key, inclusive
     * @param end the end key, exclusive
     * @param iter output param for the iterator to iterate over the key-value pairs in the specified range.
     *             If the range is empty, the iterator will be valid but `has_next()` will return false.
     *             If an error occurs, the iterator will be invalid and `error_code()` will return the error code.
     * @param opts options for range get
     *             - `snapshot`: if true, the range will not be included in txn conflict detection this time.
     *             - `limit`: the maximum number of key-value pairs to return.
     *             - `reverse`: if true, the iterator will return keys in reverse order.
     * @return TXN_OK for success, otherwise for error
     */
    TxnErrorCode get(std::string_view begin, std::string_view end,
                     std::unique_ptr<cloud::RangeGetIterator>* iter,
                     const RangeGetOptions& opts) override;

    std::unique_ptr<cloud::FullRangeGetIterator> full_range_get(
            std::string_view begin, std::string_view end,
            FullRangeGetOptions opts = FullRangeGetOptions()) override;

    /**
     * Put a key-value pair in which key will in the form of
     * `key_prefix + versiontimestamp`. `versiontimestamp` is autogenerated by the
     * system and it's 10-byte long and encoded in big-endian
     *
     * @param key_prefix prefix for key convertion, can be zero-length
     * @param val value
     */
    void atomic_set_ver_key(std::string_view key_prefix, std::string_view val) override;

    /**
     * Put a key-value pair in which key will in the form of `key_prefix + versiontimestamp + key_suffix`.
     *
     * @param key key for conversion, it should at least 10-byte long.
     * @param offset the offset of the versionstamp to place. `offset` + 10 must be less than or equal to
     *               the length of `key`.
     * @param val value
     * @return true for success, otherwise the offset is invalid or the key is too short.
     */
    bool atomic_set_ver_key(std::string_view key, uint32_t offset, std::string_view val) override;

    /**
     * Put a key-value pair in which key will in the form of
     * `value + versiontimestamp`. `versiontimestamp` is autogenerated by the
     * system and it's 10-byte long and encoded in big-endian
     *
     * @param key prefix for key convertion, can be zero-length
     * @param val value
     */
    void atomic_set_ver_value(std::string_view key, std::string_view val) override;

    /**
     * Adds a value to database
     * @param to_add positive for addition, negative for substraction
     */
    void atomic_add(std::string_view key, int64_t to_add) override;
    // TODO: min max or and xor cmp_and_clear set_ver_value

    bool decode_atomic_int(std::string_view data, int64_t* val) override;

    void remove(std::string_view key) override;

    /**
     * Remove a closed-open range
     */
    void remove(std::string_view begin, std::string_view end) override;

    /**
     *
     *@return TXN_OK for success, TXN_CONFLICT for conflict, otherwise for error
     */
    TxnErrorCode commit() override;

    TxnErrorCode watch_key(std::string_view key) override;

    TxnErrorCode get_read_version(int64_t* version) override;
    TxnErrorCode get_committed_version(int64_t* version) override;

    TxnErrorCode abort() override;

    void enable_get_versionstamp() override;

    TxnErrorCode get_versionstamp(Versionstamp* versionstamp) override;

    TxnErrorCode batch_get(std::vector<std::optional<std::string>>* res,
                           const std::vector<std::string>& keys,
                           const BatchGetOptions& opts = BatchGetOptions()) override;

    TxnErrorCode batch_scan(std::vector<std::optional<std::pair<std::string, std::string>>>* res,
                            const std::vector<std::pair<std::string, std::string>>& ranges,
                            const BatchGetOptions& opts = BatchGetOptions()) override;

    size_t approximate_bytes() const override { return approximate_bytes_; }

    size_t num_get_keys() const override { return num_get_keys_; }

    size_t num_del_keys() const override { return num_del_keys_; }

    size_t num_put_keys() const override { return num_put_keys_; }

    size_t delete_bytes() const override { return delete_bytes_; }

    size_t put_bytes() const override { return put_bytes_; }

    size_t get_bytes() const override { return get_bytes_; }

private:
    // Return the conflicting range when the transaction commit returns TXN_CONFLICT.
    //
    // It only works when the report_conflicting_ranges option is enabled.
    TxnErrorCode get_conflicting_range(
            std::vector<std::pair<std::string, std::string>>* key_values);
    TxnErrorCode report_conflicting_range();

    std::shared_ptr<Database> db_ {nullptr};
    bool commited_ = false;
    bool aborted_ = false;
    FDBTransaction* txn_ = nullptr;

    size_t num_get_keys_ {0};
    size_t num_del_keys_ {0};
    size_t num_put_keys_ {0};
    size_t delete_bytes_ {0};
    size_t get_bytes_ {0};
    size_t put_bytes_ {0};
    size_t approximate_bytes_ {0};

    bool versionstamp_enabled_ {false};
    Versionstamp versionstamp_result_;
};

class FullRangeGetIterator final : public cloud::FullRangeGetIterator {
public:
    FullRangeGetIterator(std::string begin, std::string end, FullRangeGetOptions opts);

    ~FullRangeGetIterator() override;

    bool is_valid() const override { return code_ == TxnErrorCode::TXN_OK; }

    bool has_next() override;

    TxnErrorCode error_code() const override { return code_; }

    std::optional<std::pair<std::string_view, std::string_view>> next() override;

    std::optional<std::pair<std::string_view, std::string_view>> peek() override;

private:
    // Set `is_valid_` to false if meet any error
    void init();

    // Await `fut_` and create new inner iter.
    // Set `is_valid_` to false if meet any error
    void await_future();

    // Perform a paginate range get asynchronously and set `fut_`.
    // Set `is_valid_` to false if meet any error
    void async_inner_get(std::string_view begin, std::string_view end);
    void async_get_next_batch();

    bool prefetch();

    FullRangeGetOptions opts_;

    TxnErrorCode code_ = TxnErrorCode::TXN_OK;
    int num_consumed_ = 0;
    std::string begin_;
    std::string end_;
    std::unique_ptr<Transaction> txn_;
    std::unique_ptr<RangeGetIterator> inner_iter_;
    FDBFuture* fut_ = nullptr;
};

} // namespace fdb

} // namespace doris::cloud
